package com.tang.rabbitmq.config;
/*
 *   @author Dram
 *   @create 2021-04-16 13:53
 */

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;


@SpringBootConfiguration
public class RabbitmqConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public static final String IMMEDIATE_QUEUE = "queue.demo.immediate";//立即消费的队列名称
    public static final String IMMEDIATE_EXCHANGE = "exchange.demo.immediate";//立即消费的exchange
    public static final String IMMEDIATE_ROUTING_KEY = "routingkey.demo.immediate";//立即消费的routing-key 名称
    public static final String DELAY_QUEUE= "queue.demo.delay";//延时消费的队列名称
    public static final String DEAD_LETTER_EXCHANGE = "exchange.demo.delay";//延时消费的exchange
    public static final String DELAY_ROUTING_KEY = "routingkey.demo.delay";//延时消费的routing-key名称


    /**
     * 设置消息可靠投递
     */
    @PostConstruct
    public void initRabbitTemplate(){
        //设置消息到达broker端确认回调方法 全局
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("ConfirmCallback...... RabbitTemplate=["+correlationData+"],b=["+b+"],s=["+s+"]");
            }
        });
        //设置消息从交换机到队列的确认回调方法
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println("ReturnsCallback.... returnedMessage=["+returnedMessage+"] ");
            }
        });


    }


    // 配置一个工作模型队列
    @Bean
    public Queue queueWork1() {
        return new Queue("queue_work");
    }

    // 发布订阅模式
// 声明两个队列
    @Bean
    public Queue queueFanout1() {
        return new Queue("queue_fanout1");
    }
    @Bean
    public Queue queueFanout2() {
        return new Queue("queue_fanout2");
    }
    // 准备一个交换机
    @Bean
    public FanoutExchange exchangeFanout() {
        return new FanoutExchange("exchange_fanout");
    }
    // 将交换机和队列进行绑定
    @Bean
    public Binding bindingExchange1() {
        return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());
    }
    @Bean
    public Binding bindingExchange2() {
        return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());
    }


    // topic 模型
    @Bean
    public Queue queueTopic1() {
        return new Queue("queue_topic1");
    }
    @Bean
    public Queue queueTopic2() {
        return new Queue("queue_topic2");
    }
    @Bean
    public TopicExchange exchangeTopic() {
        return new TopicExchange("exchange_topic");
    }
    @Bean
    public Binding bindingTopic1() {
        return BindingBuilder.bind(queueTopic1()).to(exchangeTopic()).with("topic.#");
    }
    @Bean
    public Binding bindingTopic2() {
        return BindingBuilder.bind(queueTopic2()).to(exchangeTopic()).with("topic.*");
    }
    // 测试confirm 机制，专门创建了一个队列
    @Bean
    public Queue queueConfirm() {
        return new Queue("queue_confirm");
    }
    // 测试return机制
    @Bean
    public Queue queueReturn() {

        return new Queue("queue_return");
    }
    @Bean
    public TopicExchange exchangeReturn() {
        return new TopicExchange("exchange_return");
    }
    @Bean
    public Binding bindingReturn() {
        return BindingBuilder.bind(queueReturn()).to(exchangeReturn()).with("return.*");
    }


    // 创建一个立即消费队列
    @Bean
    public Queue immediateQueue() {
        // 第一个参数是创建的queue的名字，第二个参数是是否支持持久化
        return new Queue(IMMEDIATE_QUEUE, true);
    }

    // 创建一个延时队列
    @Bean
    public Queue delayQueue() {
        Map<String, Object> params = new HashMap<>();
        //设置消息的存活时间
        params.put("x-message-ttl", 1000*20);
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称，
        params.put("x-dead-letter-exchange", IMMEDIATE_EXCHANGE);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", IMMEDIATE_ROUTING_KEY);
        return new Queue(DELAY_QUEUE, true, false, false, params);
    }

    @Bean
    public DirectExchange immediateExchange() {
        // 一共有三种构造方法，可以只传exchange的名字， 第二种，可以传exchange名字，是否支持持久化，是否可以自动删除，
        //第三种在第二种参数上可以增加Map，Map中可以存放自定义exchange中的参数
        return new DirectExchange(IMMEDIATE_EXCHANGE, true, false);
    }

    @Bean
    public DirectExchange deadLetterExchange() {
        // 一共有三种构造方法，可以只传exchange的名字， 第二种，可以传exchange名字，是否支持持久化，是否可以自动删除，
        // 第三种在第二种参数上可以增加Map，Map中可以存放自定义exchange中的参数
        return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
    }

    //把立即消费的队列和立即消费的exchange绑定在一起
    @Bean
    public Binding immediateBinding() {
        return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with(IMMEDIATE_ROUTING_KEY);
    }

    //把延时消费的队列和延时消费的exchange绑定在一起
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(DELAY_ROUTING_KEY);
    }
}
